/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.values;



import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.annotations.Internal;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;



import java.util.Collections;

import java.util.LinkedHashMap;

import java.util.Map;

import java.util.Objects;



/**

 * A {@link PCollectionTuple} is an immutable tuple of heterogeneously-typed {@link PCollection

 * PCollections}, "keyed" by {@link TupleTag TupleTags}. A {@link PCollectionTuple} can be used as

 * the input or output of a {@link PTransform} taking or producing multiple PCollection inputs or

 * outputs that can be of different types, for instance a {@link ParDo} with multiple outputs.

 *

 * <p>A {@link PCollectionTuple} can be created and accessed like follows:

 *

 * <pre>{@code

 * PCollection<String> pc1 = ...;

 * PCollection<Integer> pc2 = ...;

 * PCollection<Iterable<String>> pc3 = ...;

 *

 * // Create TupleTags for each of the PCollections to put in the

 * // PCollectionTuple (the type of the TupleTag enables tracking the

 * // static type of each of the PCollections in the PCollectionTuple):

 * TupleTag<String> tag1 = new TupleTag<>();

 * TupleTag<Integer> tag2 = new TupleTag<>();

 * TupleTag<Iterable<String>> tag3 = new TupleTag<>();

 *

 * // Create a PCollectionTuple with three PCollections:

 * PCollectionTuple pcs =

 *     PCollectionTuple.of(tag1, pc1)

 *                     .and(tag2, pc2)

 *                     .and(tag3, pc3);

 *

 * // Create an empty PCollectionTuple:

 * Pipeline p = ...;

 * PCollectionTuple pcs2 = PCollectionTuple.empty(p);

 *

 * // Get PCollections out of a PCollectionTuple, using the same tags

 * // that were used to put them in:

 * PCollection<Integer> pcX = pcs.get(tag2);

 * PCollection<String> pcY = pcs.get(tag1);

 * PCollection<Iterable<String>> pcZ = pcs.get(tag3);

 *

 * // Get a map of all PCollections in a PCollectionTuple:

 * Map<TupleTag<?>, PCollection<?>> allPcs = pcs.getAll();

 * }</pre>

 */

public class PCollectionTuple implements PInput, POutput {

  /**

   * Returns an empty {@link PCollectionTuple} that is part of the given {@link Pipeline}.

   *

   * <p>A {@link PCollectionTuple} containing additional elements can be created by calling {@link

   * #and} on the result.

   */

  public static PCollectionTuple empty(Pipeline pipeline) {

    return new PCollectionTuple(pipeline);

  }



  /**

   * Returns a singleton {@link PCollectionTuple} containing the given {@link PCollection} keyed by

   * the given {@link TupleTag}.

   *

   * <p>A {@link PCollectionTuple} containing additional elements can be created by calling {@link

   * #and} on the result.

   */

  public static <T> PCollectionTuple of(TupleTag<T> tag, PCollection<T> pc) {

    return empty(pc.getPipeline()).and(tag, pc);

  }



  /**

   * A version of {@link #of(TupleTag, PCollection)} that takes in a String instead of a {@link

   * TupleTag}.

   *

   * <p>This method is simpler for cases when a typed tuple-tag is not needed to extract a

   * PCollection, for example when using schema transforms.

   */

  public static <T> PCollectionTuple of(String tag, PCollection<T> pc) {

    return of(new TupleTag<>(tag), pc);

  }



  /**

   * A version of {@link #of(String, PCollection)} that takes in two PCollections of the same type.

   */

  public static <T> PCollectionTuple of(

      String tag1, PCollection<T> pc1, String tag2, PCollection<T> pc2) {

    return of(tag1, pc1).and(tag2, pc2);

  }



  /**

   * A version of {@link #of(String, PCollection)} that takes in three PCollections of the same

   * type.

   */

  public static <T> PCollectionTuple of(

      String tag1,

      PCollection<T> pc1,

      String tag2,

      PCollection<T> pc2,

      String tag3,

      PCollection<T> pc3) {

    return of(tag1, pc1, tag2, pc2).and(tag3, pc3);

  }



  /**

   * A version of {@link #of(String, PCollection)} that takes in four PCollections of the same type.

   */

  public static <T> PCollectionTuple of(

      String tag1,

      PCollection<T> pc1,

      String tag2,

      PCollection<T> pc2,

      String tag3,

      PCollection<T> pc3,

      String tag4,

      PCollection<T> pc4) {

    return of(tag1, pc1, tag2, pc2, tag3, pc3).and(tag4, pc4);

  }



  /**

   * A version of {@link #of(String, PCollection)} that takes in five PCollections of the same type.

   */

  public static <T> PCollectionTuple of(

      String tag1,

      PCollection<T> pc1,

      String tag2,

      PCollection<T> pc2,

      String tag3,

      PCollection<T> pc3,

      String tag4,

      PCollection<T> pc4,

      String tag5,

      PCollection<T> pc5) {

    return of(tag1, pc1, tag2, pc2, tag3, pc3, tag4, pc4).and(tag5, pc5);

  }



  // To create a PCollectionTuple with more than five inputs, use the and() builder method.



  /**

   * Returns a new {@link PCollectionTuple} that has each {@link PCollection} and {@link TupleTag}

   * of this {@link PCollectionTuple} plus the given {@link PCollection} associated with the given

   * {@link TupleTag}.

   *

   * <p>The given {@link TupleTag} should not already be mapped to a {@link PCollection} in this

   * {@link PCollectionTuple}.

   *

   * <p>Each {@link PCollection} in the resulting {@link PCollectionTuple} must be part of the same

   * {@link Pipeline}.

   */

  public <T> PCollectionTuple and(TupleTag<T> tag, PCollection<T> pc) {

    if (pc.getPipeline() != pipeline) {

      throw new IllegalArgumentException("PCollections come from different Pipelines");

    }



    return new PCollectionTuple(

        pipeline,

        new ImmutableMap.Builder<TupleTag<?>, PCollection<?>>()

            .putAll(pcollectionMap)

            .put(tag, pc)

            .build());

  }



  /**

   * A version of {@link #and(TupleTag, PCollection)} that takes in a String instead of a TupleTag.

   *

   * <p>This method is simpler for cases when a typed tuple-tag is not needed to extract a

   * PCollection, for example when using schema transforms.

   */

  public <T> PCollectionTuple and(String tag, PCollection<T> pc) {

    return and(new TupleTag<>(tag), pc);

  }



  /**

   * Returns whether this {@link PCollectionTuple} contains a {@link PCollection} with the given

   * tag.

   */

  public <T> boolean has(TupleTag<T> tag) {

    return pcollectionMap.containsKey(tag);

  }



  /**

   * Returns whether this {@link PCollectionTuple} contains a {@link PCollection} with the given

   * tag.

   */

  public <T> boolean has(String tag) {

    return has(new TupleTag<>(tag));

  }



  /**

   * Returns the {@link PCollection} associated with the given {@link TupleTag} in this {@link

   * PCollectionTuple}. Throws {@link IllegalArgumentException} if there is no such {@link

   * PCollection}, i.e., {@code !has(tag)}.

   */

  public <T> PCollection<T> get(TupleTag<T> tag) {

    @SuppressWarnings("unchecked")

    PCollection<T> pcollection = (PCollection<T>) pcollectionMap.get(tag);

    if (pcollection == null) {

      throw new IllegalArgumentException("TupleTag not found in this PCollectionTuple tuple");

    }

    return pcollection;

  }



  /**

   * Returns the {@link PCollection} associated with the given tag in this {@link PCollectionTuple}.

   * Throws {@link IllegalArgumentException} if there is no such {@link PCollection}, i.e., {@code

   * !has(tag)}.

   */

  public <T> PCollection<T> get(String tag) {

    return get(new TupleTag<>(tag));

  }



  /**

   * Returns an immutable Map from {@link TupleTag} to corresponding {@link PCollection}, for all

   * the members of this {@link PCollectionTuple}.

   */

  public Map<TupleTag<?>, PCollection<?>> getAll() {

    return pcollectionMap;

  }



  /**

   * Like {@link #apply(String, PTransform)} but defaulting to the name of the {@link PTransform}.

   *

   * @return the output of the applied {@link PTransform}

   */

  public <OutputT extends POutput> OutputT apply(PTransform<? super PCollectionTuple, OutputT> t) {

    return Pipeline.applyTransform(this, t);

  }



  /**

   * Applies the given {@link PTransform} to this input {@link PCollectionTuple}, using {@code name}

   * to identify this specific application of the transform. This name is used in various places,

   * including the monitoring UI, logging, and to stably identify this application node in the job

   * graph.

   *

   * @return the output of the applied {@link PTransform}

   */

  public <OutputT extends POutput> OutputT apply(

      String name, PTransform<? super PCollectionTuple, OutputT> t) {

    return Pipeline.applyTransform(name, this, t);

  }



  /////////////////////////////////////////////////////////////////////////////

  // Internal details below here.



  final Pipeline pipeline;

  final Map<TupleTag<?>, PCollection<?>> pcollectionMap;



  PCollectionTuple(Pipeline pipeline) {

    this(pipeline, new LinkedHashMap<>());

  }



  PCollectionTuple(Pipeline pipeline, Map<TupleTag<?>, PCollection<?>> pcollectionMap) {

    this.pipeline = pipeline;

    this.pcollectionMap = Collections.unmodifiableMap(pcollectionMap);

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>Returns a {@link PCollectionTuple} with each of the given tags mapping to a new output

   * {@link PCollection}.

   *

   * <p>For use by primitive transformations only.

   */

  @Internal

  public static PCollectionTuple ofPrimitiveOutputsInternal(

      Pipeline pipeline,

      TupleTagList outputTags,

      Map<TupleTag<?>, Coder<?>> coders,

      WindowingStrategy<?, ?> windowingStrategy,

      PCollection.IsBounded isBounded) {

    Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();

    for (TupleTag<?> outputTag : outputTags.tupleTags) {

      if (pcollectionMap.containsKey(outputTag)) {

        throw new IllegalArgumentException("TupleTag already present in this tuple");

      }



      // In fact, `token` and `outputCollection` should have

      // types TypeDescriptor<T> and PCollection<T> for some

      // unknown T. It is safe to create `outputCollection`

      // with type PCollection<Object> because it has the same

      // erasure as the correct type. When a transform adds

      // elements to `outputCollection` they will be of type T.

      @SuppressWarnings("unchecked")

      PCollection outputCollection =

          PCollection.createPrimitiveOutputInternal(

                  pipeline, windowingStrategy, isBounded, (Coder) coders.get(outputTag))

              .setTypeDescriptor(outputTag.getTypeDescriptor());



      pcollectionMap.put(outputTag, outputCollection);

    }

    return new PCollectionTuple(pipeline, pcollectionMap);

  }



  @Override

  public Pipeline getPipeline() {

    return pipeline;

  }



  @Override

  public Map<TupleTag<?>, PValue> expand() {

    return ImmutableMap.copyOf(pcollectionMap);

  }



  @Override

  public void finishSpecifyingOutput(

      String transformName, PInput input, PTransform<?, ?> transform) {

    // All component PCollections will already have been finished. Update their names if

    // appropriate.

    int i = 0;

    for (Map.Entry<TupleTag<?>, PCollection<?>> entry : pcollectionMap.entrySet()) {

      TupleTag<?> tag = entry.getKey();

      PCollection<?> pc = entry.getValue();

      if (pc.getName().equals(PValueBase.defaultName(transformName))) {

        pc.setName(String.format("%s.%s", transformName, tag.getOutName(i)));

      }

      i++;

    }

  }



  @Override

  public boolean equals(Object other) {

    if (!(other instanceof PCollectionTuple)) {

      return false;

    }

    PCollectionTuple that = (PCollectionTuple) other;

    return this.pipeline.equals(that.pipeline) && this.pcollectionMap.equals(that.pcollectionMap);

  }



  @Override

  public int hashCode() {

    return Objects.hash(this.pipeline, this.pcollectionMap);

  }

}